package com.jeesuite.monitor.channel;

import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jeesuite.monitor.core.MonitorContext;
import com.jeesuite.monitor.core.UploadChannel;
import com.jeesuite.monitor.model.message.KafkaMessage;
import com.jeesuite.monitor.model.metaobject.Trace;

public class KafkaUploadChannel implements UploadChannel {

	private static final Logger LOG = LoggerFactory.getLogger(KafkaUploadChannel.class);
	
	private KafkaProducer<String, Object> kafkaProducer;
	
	private String topic;
	
	@Override
	public void start() {
		
		Map<String, String> monitorConfigs = MonitorContext.getContext().getProperties();
		topic = monitorConfigs.get("kafka.topic");
		
		Properties configs = new Properties();
		configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, monitorConfigs.get("kafka.servers"));
		configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
		configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MessageSerializer.class.getName());
		configs.put(ProducerConfig.RETRIES_CONFIG, "0"); 
		configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); 
        
        this.kafkaProducer = new KafkaProducer<String, Object>(configs);
	}

	@Override
	public void close() {
		kafkaProducer.close();
	}

	@Override
	public void process(Trace trace) {
		KafkaMessage message = new KafkaMessage(trace);
		doSend(topic, message.getMsgId(), message);
	}
	
	/**
	 * 异步发送消息
	 * @param topicName
	 * @param messageKey
	 * @param message
	 */
	private void doSend(final String topicName, final String messageKey,final KafkaMessage message) {
		// 异步发送
        this.kafkaProducer.send(new ProducerRecord<String, Object>(topicName, messageKey,message), new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception ex) {
                if (ex != null) {
                	LOG.error("kafka_send_fail,topic="+topicName+",messageId="+messageKey,ex);
                } else {
                    if (LOG.isDebugEnabled()) {
                    	LOG.debug("kafka_send_success,topic=" + topicName + ", messageId=" + messageKey + ", partition=" + metadata.partition() + ", offset=" + metadata.offset());
                    }
                }
            }
        });
	}

}
